Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent the mqtt establish the second connection (#4594) #4814

Closed
wants to merge 2 commits into from
Closed

Conversation

trankennykhang
Copy link

Required for all PRs:

  • [x ] Signed CLA.

@glinton glinton added fix pr to fix corresponding bug area/mqtt labels Oct 5, 2018
@danielnelson
Copy link
Contributor

@trankennykhang Thanks this looks right but I'm still suspect about our handling of the connected boolean in onConnect. It seems like there is a race condition with the connect function. Do you think we could merge onConnect and connect like:

diff --git a/plugins/inputs/mqtt_consumer/mqtt_consumer.go b/plugins/inputs/mqtt_consumer/mqtt_consumer.go
index 5853ad93..85b7074a 100644
--- a/plugins/inputs/mqtt_consumer/mqtt_consumer.go
+++ b/plugins/inputs/mqtt_consumer/mqtt_consumer.go
@@ -150,27 +150,25 @@ func (m *MQTTConsumer) connect() error {
                return err
        }

-       go m.receiver()
-
-       return nil
-}
-
-func (m *MQTTConsumer) onConnect(c mqtt.Client) {
        log.Printf("I! MQTT Client Connected")
-       if !m.PersistentSession || !m.connected {
+       m.connected = true
+
+       if !m.PersistentSession {
                topics := make(map[string]byte)
                for _, topic := range m.Topics {
                        topics[topic] = byte(m.QoS)
                }
-               subscribeToken := c.SubscribeMultiple(topics, m.recvMessage)
+               subscribeToken := m.client.SubscribeMultiple(topics, m.recvMessage)
                subscribeToken.Wait()
                if subscribeToken.Error() != nil {
                        m.acc.AddError(fmt.Errorf("E! MQTT Subscribe Error\ntopics: %s\nerror: %s",
                                strings.Join(m.Topics[:], ","), subscribeToken.Error()))
                }
-               m.connected = true
        }
-       return
+
+       go m.receiver()
+
+       return nil
 }
 
 func (m *MQTTConsumer) onConnectionLost(c mqtt.Client, err error) {
@@ -271,10 +269,9 @@ func (m *MQTTConsumer) createOpts() (*mqtt.ClientOptions, error) {
                
                opts.AddBroker(server)
        }
-       opts.SetAutoReconnect(true)
+       opts.SetAutoReconnect(false)
        opts.SetKeepAlive(time.Second * 60)
        opts.SetCleanSession(!m.PersistentSession)
-       opts.SetOnConnectHandler(m.onConnect)
        opts.SetConnectionLostHandler(m.onConnectionLost)

        return opts, nil

@danielnelson
Copy link
Contributor

@trankennykhang Based on your change here I reorganized how the mqtt_consumer plugin makes connections. Could you review #4846?

@danielnelson
Copy link
Contributor

This change was included into #4846

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/mqtt fix pr to fix corresponding bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants